package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 生产者
 */
public class ProducerQuickStart {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1kafka链接配置信息
        Properties prop = new Properties();
        //kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //key和value的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //ack配置，消息确认机制,默认ack==1
        prop.put(ProducerConfig.ACKS_CONFIG,"all");

        //设置重试次数，默认0次，指定每发送失败重试多少次
        prop.put(ProducerConfig.RETRIES_CONFIG, 10);

        //数据压缩
        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");


        //2创建kafka生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);

        //3发送消息
        /**
         * 第一个参数： topic
         * 第二个参数： 消息的key
         * 第三个参数： 消息的value
         */
        for(int i=0;i<5;i++) {
            ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<String, String>("itcast-topic-input", "hello kafka");
            producer.send(kvProducerRecord);
        }
        //同步发送消息
//        RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
//        System.out.println(recordMetadata.offset());




//        //异步消息发送
//        producer.send(kvProducerRecord, new Callback() {
//                    @Override
//                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//                        if (e != null) {
//                            System.out.println("记录消息到日志表中");
//                        }
//                        System.out.println(recordMetadata.offset());
//                    }
//                });
//
//
                //4关闭消息通道，必须要关闭，否则消息发送不成功
                producer.close();

    }
}
